{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Feature Transformation with Amazon a SageMaker Processing Job and Apache Spark\n",
    "\n",
    "Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.\n",
    "\n",
    "Often, distributed data processing frameworks such as Apache Spark are used to pre-process data sets in order to prepare them for training. In this notebook we'll use Amazon SageMaker Processing, and leverage the power of Apache Spark in a managed SageMaker environment to run our processing workload."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![](img/prepare_dataset_bert.png)\n",
    "\n",
    "![](img/processing.jpg)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Environment\n",
    "\n",
    "Let's start by specifying:\n",
    "* The S3 bucket and prefixes that you use for training and model data. Use the default bucket specified by the Amazon SageMaker session.\n",
    "* The IAM role ARN used to give processing and training access to the dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "from time import gmtime, strftime\n",
    "import boto3\n",
    "\n",
    "sagemaker_session = sagemaker.Session()\n",
    "role = sagemaker.get_execution_role()\n",
    "bucket = sagemaker_session.default_bucket()\n",
    "region = boto3.Session().region_name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Input Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Inputs\n",
    "s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)\n",
    "print(s3_input_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $s3_input_data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run the Job using a SageMaker Processing Job\n",
    "\n",
    "Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built, and a Spark ML script for processing in the job configuration."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Review the Spark processing script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pygmentize preprocess-spark-text-to-bert.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.spark.processing import PySparkProcessor\n",
    "\n",
    "processor = PySparkProcessor(base_job_name='spark-amazon-reviews-processor',\n",
    "                             role=role,\n",
    "                             framework_version='2.4',\n",
    "#                             py_version='py37',\n",
    "#                             container_version='v1.0',\n",
    "                             instance_count=2,\n",
    "                             instance_type='ml.r5.xlarge',\n",
    "                             max_runtime_in_seconds=7200)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Output Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from time import gmtime, strftime\n",
    "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n",
    "\n",
    "output_prefix = 'amazon-reviews-spark-processor-{}'.format(timestamp_prefix)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data_bert_output = 's3://{}/{}/output/bert-train'.format(bucket, output_prefix)\n",
    "validation_data_bert_output = 's3://{}/{}/output/bert-validation'.format(bucket, output_prefix)\n",
    "test_data_bert_output = 's3://{}/{}/output/bert-test'.format(bucket, output_prefix)\n",
    "\n",
    "print(train_data_bert_output)\n",
    "print(validation_data_bert_output)\n",
    "print(test_data_bert_output)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.processing import ProcessingOutput\n",
    "\n",
    "processor.run(submit_app='preprocess-spark-text-to-bert.py',\n",
    "              arguments=['s3_input_data', s3_input_data,\n",
    "                         's3_output_train_data', train_data_bert_output,\n",
    "                         's3_output_validation_data', validation_data_bert_output,\n",
    "                         's3_output_test_data', test_data_bert_output,                         \n",
    "              ],\n",
    "              # We need to define these outputs so we can later call  \n",
    "              #    ProcessingJob.from_processing_name() \n",
    "              # to describe the job and poll for Completed status\n",
    "              outputs=[\n",
    "                       ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                        output_name='bert-train',\n",
    "                                        source='/opt/ml/processing/output/bert/train'),\n",
    "                       ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                        output_name='bert-validation',\n",
    "                                        source='/opt/ml/processing/output/bert/validation'),\n",
    "                       ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                        output_name='bert-test',\n",
    "                                        source='/opt/ml/processing/output/bert/test'),\n",
    "              ],          \n",
    "              logs=True,\n",
    "              wait=False\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/sagemaker/home?region={}#/processing-jobs/{}\">Processing Job</a></b>'.format(region, spark_processing_job_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix\">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, spark_processing_job_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "# This is different than the job name because we are not using ProcessingOutput's in this Spark ML case.\n",
    "spark_processing_job_s3_output_prefix = output_prefix\n",
    "\n",
    "display(HTML('<b>Review <a target=\"blank\" href=\"https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview\">S3 Output Data</a> After The Processing Job Has Completed</b>'.format(bucket, spark_processing_job_s3_output_prefix, region)))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Please Wait Until the Processing Job Completes\n",
    "Re-run this next cell until the job status shows `Completed`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=spark_processing_job_name,\n",
    "                                                                            sagemaker_session=sagemaker_session)\n",
    "\n",
    "processing_job_description = running_processor.describe()\n",
    "\n",
    "processing_job_status = processing_job_description['ProcessingJobStatus']\n",
    "print('\\n')\n",
    "print(processing_job_status)\n",
    "print('\\n')\n",
    "\n",
    "print(processing_job_description)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# _Please Wait Until the ^^ Processing Job ^^ Completes Above._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "running_processor.wait()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Inspect the Processed Output Dataset\n",
    "\n",
    "\n",
    "## _The next cells will not work properly until the job completes above._\n",
    "\n",
    "\n",
    "Take a look at a few rows of the transformed dataset to make sure the processing was successful."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!aws s3 ls --recursive $train_data_bert_output/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls --recursive $validation_data_bert_output/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!aws s3 ls --recursive $test_data_bert_output/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "Jupyter.notebook.save_checkpoint();\n",
    "Jupyter.notebook.session.delete();"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
